Spark shell allow us to interact with data that is distributed on disk or in memory across many machines. Provides Scala and Python shells.
Make a copy of conf/log4j.properties.template called conf/log4j.properties and find the following line:
log4j.rootCategory=INFO, console
And change it to
log4j.rootCategory=WARN, console
In [21]:
lines = sc.textFile('file:///usr/local/spark/README.md')
In [22]:
lines
Out[22]:
In [23]:
lines.count()
Out[23]:
In [24]:
lines.first()
Out[24]:
Every Spark application consists of a driver program that launches various parallel operations on a cluster.
Driver programs access Spark through a SparkContext object, which represents a connection to a computing cluster.
Driver programs manages a number of nodes called executors.
In standalone applications, such as scripts, we have to initialize our own SparkContext.
In Java and Scala, one has to give the application a Maven dependency on the spark-core artifact.
In Python, application must be run using bin/spark-submit script.
In [25]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster('local').setAppName('My App')
#sc = SparkContext(conf=conf) # Spark context already running inside Ipython notebook
Resilient Distributed Dataset (RDD) is distributed collection of elements.
In Spark, all works is expressed as either creating new RDDs, transforming existing RDDs, or calling operations on RDDs to compute a result.
Each RDD is split into multiple partitions, which may be computed on different nodes of the cluster.
RDDs can contain any type of Python, Java, or Scala objects, including user defined classes.
Creating RDD:
Once created, RDDs offer two types of operations.
Spark performs transformations in lazy fashion, i.e transformations are only computed when an action is called.
Spark RDDs are by default recomputed each time one run an action on them. In order to overcome this, use
In [26]:
# rdd.persist()
Every Spark application will work as follows:
In [27]:
lines = sc.parallelize(['pandas', 'I like pandas'])
lines
Out[27]:
In [28]:
lines = sc.textFile('file:///usr/local/spark/README.md')
lines
Out[28]:
Transformed RDDs are computed lazily, only when one use them in an action.
In [29]:
inputRDD = sc.textFile('log.txt')
errorsRDD = inputRDD.filter(lambda x: 'error' in x)
warningsRDD = inputRDD.filter(lambda x: 'warning' in x)
# badLinesRDD = errorsRDD.union(warningsRDD)
Spark keeps track of the set of dependencies between different RDDs, called the lineage graph.
It uses this information to compute each RDD on demand and to recover lost data if part of persistent RDD is lost.
Operations that return a final value to the driver program or write data to an external storage system.
In [30]:
# print 'Input had ' + badLinesRDD.count() + ' concerning lines'
# print 'Here are the 10 examples'
# for line in badLinesRDD.take(10):
# print line
RDD also have collect() function to retrieve the entire RDD.
In order to collect large RDD, better save the content of an RDD using saveAsTextFile() function.
When we call a transformation on an RDD, the operation is not immediately performed.
Think of each RDD as consisting of instructions on how to compute the data that we build up through transformations.
Most of Spark’s transformations, and some of its actions, depend on passing in functions that are used by Spark to compute data.
Three options for passing functions
In [31]:
lines = sc.parallelize(['hello world', 'hi'])
In [32]:
words = lines.map(lambda line: line.split())
words.collect()
Out[32]:
In [33]:
words = lines.flatMap(lambda line: line.split())
words.collect()
Out[33]:
Note: Return type of the result in reduce() and fold() should be the same type as that of the elements in the RDD we are operating over.
Some functions are available only on certain types of RDDs, such as mean() and variance() on numeric RDDs or join() on key/value pair RDDs.
In Scala and Java, these methods aren’t defined on the standard RDD class, so to access this additional functionality we have to make sure we get the correct specialized class.
To avoid computing an RDD multiple times, we can ask Spark to persist the data.
In [34]:
# rdd.persist(StorageLevel.MEMORY_ONLY)
Spark provides special operations on RDDs containing key/value pairs. These RDDs are called pair RDDs. Pair RDDs are a useful building block in many programs, as they expose operations that allow you to act on each key in parallel or regroup data across the network.
In [35]:
pairs = words.map(lambda w: (w, 1))
pairs.collect()
Out[35]:
In [36]:
## Word Count
rdd = sc.textFile('file:///usr/local/spark/README.md')
words = rdd.flatMap(lambda x: x.split())
result = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
result.take(10)
Out[36]:
Spark will always try to infer a sensible default value based on the size of your cluster, but in some cases you will want to tune the level of parallelism for better performance.
Pass number of paritions or use repartition() or coalesce() for tuning.
Spark programs can choose to control their RDDs’ partitioning to reduce communication.
Use partitionBy() transformation at the start of the program.
Use rdd.partitioner in Scala and Java to determine the partitioner
cogroup(), groupWith(), join(), leftOuterJoin(), rightOuter Join(), groupByKey(), reduceByKey(), combineByKey(), and lookup()
While Spark’s HashPartitioner and RangePartitioner are well suited to many use cases, Spark also allows you to tune how an RDD is partitioned by providing a custom Partitioner object.
Three common sets of data sources:
Loading:
sc.texfile() - load a single text file as an RDD, each input line becomes an element in the RDD.
sc.wholeTextFiles() - load multiple whole text files at the same time into a pair RDD, with the key being the name and the value being the contents of each file.
Saving:
result.saveAsTextFile() - The path is treated as a directory and Spark will output multiple files underneath that directory.
Loading the data as a text file and then parsing the JSON data is an approach that we can use in all of the supported languages. This works assuming that you have one JSON record per row.
In [37]:
import json
data = rdd.map(lambda x: json.loads(x))
# data.filter(lambda x: x['lovesPandas']).map(lambda x: json.dumps(x)).saveAsTextFile(outputFile)
Loading CSV/TSV data is similar to loading JSON data in that we can first load it as text and then process it.
SequenceFiles are a popular Hadoop format composed of flat files with key/value pairs. SequenceFiles have sync markers that allow Spark to seek to a point in the file and then resynchronize with the record boundaries. This allows Spark to efficiently read SequenceFiles in parallel from multiple nodes.
Use sc.sequenceFile() function to read sequence files
Use pairRDD.saveAsSequenceFile() to save sequence file
Use sc.objectFile() to read an object file
Use rdd.saveAsObjectFile() to save an object file
In Python, use saveAsPickleFile() and pickleFile() instead.
Use sc.hadoopFile() to load old Hadoop file
Use sc.newAPIHadoopFile() to load new Hadoop file
Use rdd.saveAsHadoopFile() to save an RDD as an old Hadoop file
Use rdd.saveAsNewAPIHadoopFile() to save an RDD as a new Hadoop file
Working with Big Data, we find ourselves needing to use compressed data to save storage space and network overhead.
Spark supports a large number of filesystems for reading and writing to, which we can use with any of the file formats we want.
While Spark supports loading files from the local filesystem, it requires that the files are available at the same path on all nodes in your cluster.
Spark SQL is a component to work with structured and semistructured data. By structured data, we mean data that has a schema that is, a consistent set of fields across data records.
Hive can store tables in a variety of formats, from plain text to column-oriented formats, inside HDFS or other storage systems. Spark SQL can load any table supported by Hive.
To load JSON data, first create a HiveContext as when using Hive. Then use the HiveContext.jsonFile method to get an RDD of Row objects for the whole file. Apart from using the whole Row object, you can also register this RDD as a table and select specific fields from it.
Spark can load data from any relational database that supports Java Database Con‐ nectivity (JDBC), including MySQL, Postgres, and other systems.
The Spark Cassandra connector is currently only available in Java and Scala.
When we normally pass functions to Spark, such as a map() function or a condition for filter(), they can use variables defined outside them in the driver program, but each task running on the cluster gets a new copy of each variable, and updates from these copies are not propagated back to the driver.
Accumulators provides a simple syntax for aggregating values from worker nodes back to the driver program.
In [38]:
rdd = sc.textFile('file:///usr/local/spark/README.md')
blank_lines = sc.accumulator(0)
def calculateLinesLength(x):
global blank_lines
length = len(x)
if not length:
blank_lines += 1
return length
lines_length = rdd.map(calculateLinesLength)
lines_length.collect()[:10]
Out[38]:
In [39]:
blank_lines.value
Out[39]:
Accumulators work as follows:
Note that tasks on worker nodes cannot access the accumulator’s value()—from the point of view of these tasks, accumulators are write-only variables. This allows accumulators to be implemented efficiently, without having to communicate every update.
The end result is that for accumulators used in actions, Spark applies each task’s update to each accumulator only once. Thus, if we want a reliable absolute value counter, regardless of failures or multiple evaluations, we must put it inside an action like foreach().
For accumulators used in RDD transformations instead of actions, this guarantee does not exist.
Custom accumulators need to extend AccumulatorParam. Beyond adding to a numeric value, we can use any operation for add, provided that operation is commutative and associative.
In [40]:
from pyspark.accumulators import AccumulatorParam
class CustomAccumulatorParam(AccumulatorParam):
def zero(self, initial):
return initial
def addInPlace(self, data1, data2):
data1 += data2
return data1
accum = sc.accumulator([], CustomAccumulatorParam())
accum
Out[40]:
Broadcast variables allows the program to efficiently send a large, read-only value to all the worker nodes for use in one or more Spark operations.
In [41]:
# signPrefixes = sc.broadcast(loadCallSignTable())
def processSignCount(sign_count, signPrefixes):
country = lookupCountry(sign_count[0], signPrefixes.value)
count = sign_count[1]
return (country, count)
# countryContactCounts = contactCounts.map(processSignCount)
The process of using broadcast variables:
When we are broadcasting large values, it is important to choose a data serialization format that is both fast and compact, because the time to send the value over the network can quickly become a bottleneck if it takes a long time to either serialize a value or to send the serialized value over the network.
Working with data on a per-partition basis allows us to avoid redoing setup work for each data item. Operations like opening a database connection or creating a random- number generator are examples of setup steps that we wish to avoid doing for each element. Spark has per-partition versions of map and foreach to help reduce the cost of these operations by letting you run code only once for each partition of an RDD.
Use mapPartitions() function, which gives us an iterator of the elements in each partition of the input RDD and expects us to return an iterator of our results.
Spark provides a general mechanism to pipe data to programs in other languages, like R scripts. Spark provides a pipe() method on RDDs. Spark’s pipe() lets us write parts of jobs using any language we want as long as it can read and write to Unix standard streams.
Spark can run on a wide variety of cluster managers:
In distributed mode, Spark uses a master/slave architecture with one central coordinator and many distributed workers.
The central coordinator is called the driver.
The driver communicates with a potentially large number of distributed workers called executors.
The driver runs in its own Java process and each executor is a separate Java process.
A driver and its executors are together termed a Spark application.
A Spark application is launched on a set of machines using an external service called a cluster manager.
The driver is the process where the main() method of your program runs. It is the process running the user code that creates a SparkContext, creates RDDs, and performs transformations and actions.
When the driver runs, it performs two duties:
The driver exposes information about the running Spark application through a web interface, which by default is available at port 4040.
Spark executors are worker processes responsible for running the individual tasks in a given Spark job.
The cluster manager is a pluggable component in Spark. This allows Spark to run on top of different external managers, such as YARN and Mesos, as well as its built-in Stand‐ alone cluster manager.
Spark provides a single script you can use to submit your program to it called spark-submit.
bin/spark-submit [options] {app jar | python file} [app options]
[options] are a list of flags for spark-submit. You can enumerate all possible flags by running spark-submit --help.
Since PySpark uses the existing Python installation on worker machines, you can install dependency libraries directly on the cluster machines using standard Python package managers (such as pip or easy_install), or via a manual installation into the site-packages/ directory of your Python installation. Alternatively, you can submit individual libraries using the --py-files argument to spark-submit and they will be added to the Python interpreter’s path.
For Java and Scala, it’s common practice to rely on a build tool to produce a single large JAR containing the entire transitive dependency graph of an application. The most popular build tools for Java and Scala are Maven and sbt (Scala build tool).
To use the cluster launch scripts, follow these steps:
To submit an application to the Standalone cluster manager, pass spark://master node:7077 as the master argument to spark-submit.
spark-submit --master spark://masternode:7077 yourapp
Two deploy modes:
Resource allocation is controlled by two settings:
When running in production settings, you will want your Standalone cluster to be available to accept applications even if individual nodes in your cluster go down. Out of the box, the Standalone mode will gracefully support the failure of worker nodes. If you also want the master of the cluster to be highly available, Spark supports using Apache ZooKeeper (a distributed coordination system) to keep multiple standby masters and switch to a new one when any of them fails.
Running Spark on YARN in these environments is useful because it lets Spark access HDFS data quickly, on the same nodes where the data is stored.
Using YARN in Spark is straightforward: you set an environment variable that points to your Hadoop configuration directory, then submit jobs to a special master URL with spark-submit.
spark-submit --master yarn yourapp
Apache Mesos is a general-purpose cluster manager that can run both analytics workloads and long-running services (e.g., web applications or key/value stores) on a cluster.
To use Spark on Mesos, pass a mesos:// URI to spark-submit:
spark-submit --master mesos://masternode:5050 yourapp
Spark comes with a built-in script to launch clusters on Amazon EC2. This script launches a set of nodes and then installs the Standalone cluster manager on them.
The Spark EC2 script is called spark-ec2, and is located in the ec2 folder of your Spark installation.
Spark is designed so that default settings work “out of the box” in many cases; however, there are still some configurations users might want to modify.
The highest priority is given to configurations declared explicitly in the user’s code using the set() function on a SparkConf object. Next are flags passed to spark- submit, then values in the properties file, and finally default values.
To display the lineage of an RDD, Spark provides a toDebugString() method.
The following phases occur during Spark execution:
Spark records detailed progress information and performance metrics as applications execute. These are presented to the user in two places: the Spark web UI and the logfiles produced by the driver and executor processes.
Spark’s built-in web UI - This is available on the machine where the driver is running at port 4040 by default. In the case of the YARN cluster mode, where the application driver runs inside the cluster, you should access the UI through the YARN ResourceManager, which proxies requests directly to the driver.
It helps in debugging following things:
Spark’s logging subsystem is based on log4j, a widely used Java logging library, and uses log4j’s configuration format. An example log4j configuration file is bundled with Spark at conf/log4j.properties.template. To customize Spark’s logging, first copy the example to a file called log4j.properties.
Spark offers two ways to tune the degree of parallelism for operations:
When Spark is transferring data over the network or spilling data to disk, it needs to serialize objects into a binary format. Spark also supports the use of Kryo, a third-party serialization library that improves on Java’s serialization by offering both faster serialization times and a more compact binary representation, but cannot serialize all types of objects “out of the box.” Almost all applications will benefit from shifting to Kryo for serialization.
Inside of each executor, memory is used for a few purposes:
By default Spark will leave 60% of space for RDD storage, 20% for shuffle memory, and the remaining 20% for user programs. In some cases users can tune these options for better performance.
The main parameters that affect cluster sizing are the amount of memory given to each executor, the number of cores for each executor, the total number of executors, and the number of local disks to use for scratch data.
Spark applications will benefit from having more memory and cores. Spark’s architecture allows for linear scaling; adding twice the resources will often make your application run twice as fast.
Spark’s interface for working with structured and semistructured data. Structured data is any data that has a schem that is, a known set of fields for each record.
Spark SQL provides three main capabilities:
The most powerful way to use Spark SQL is inside a Spark application. This gives us the power to easily load data and query it with SQL while simultaneously combining it with “regular” program code in Python, Java, or Scala.
User-defined functions, or UDFs, allow you to register custom functions in Python, Java, and Scala to call within SQL.
Spark SQL offers a built-in method to easily register UDFs by passing in a function in your programming language. In Scala and Python, we can use the native function and lambda syntax of the language, and in Java we need only extend the appropriate UDF class. In Python and Java we also need to specify the return type.
Spark Streaming provides an abstraction called DStreams, or discretized streams. A DStream is a sequence of data arriving over time. Internally, each DStream is represented as a sequence of RDDs arriving at each time step (hence the name “discretized”). DStreams can be created from various input sources, such as Flume, Kafka, or HDFS. Once built, they offer two types of operations: transformations, which yield a new DStream, and output operations, which write data to an external system. DStreams provide many of the same operations available on RDDs, plus new operations related to time, such as sliding windows.
Transformations on DStreams can be grouped into either stateless or stateful:
Many of the RDD transformations are also available on DStreams.
Stateless transformations can also combine data from multiple DStreams, again within each time step.
The transform() operation lets you provide any arbitrary RDD-to- RDD function to act on the DStream
Stateful transformations are operations on DStreams that track data across time; that is, some data from previous batches is used to generate the results for a new batch.
The two main types:
Stateful transformations require checkpointing to be enabled in your StreamingCon‐ text for fault tolerance.
Windowed operations compute results across a longer time period than the StreamingContext’s batch interval, by combining results from multiple batches.
All windowed operations need two parameters:
Both must be a multiple of the StreamingContext’s batch interval
The simplest window operation we can do on a DStream is window()
Spark Streaming provides a number of other windowed operations for efficiency and convenience"
To maintain state across the batches in a DStream, updateStateByKey() provides access to a state variable for DStreams of key/value pairs. Given a DStream of (key, event) pairs, it lets you construct a new DStream of (key, state) pairs by taking a function that specifies how to update the state for each key given new events.
To use updateStateByKey(), we provide a function update(events, oldState) that takes in the events that have arrived for a key and its previous state, and returns a newState to store for it.
Output operations specify what needs to be done with the final transformed data in a stream (e.g., pushing it to an external database or printing it to the screen).
Much like lazy evaluation in RDDs, if no output operation is applied on a DStream and any of its descendants, then those DStreams will not be evaluated. And if there are no output operations set in a StreamingContext, then the context will not start.
Spark Streaming has built-in support for a number of different data sources.
Each receiver runs as a long-running task within Spark’s executors, and hence occupies CPU cores allocated to the application. In addition, there need to be available cores for processing the data. This means that in order to run multiple receivers, you should have at least as many cores as the number of receivers, plus however many are needed to run your computation.
To run Spark Streaming applications 24/7, you need some special setup:
It allows Spark Streaming to periodically save data about the application to a reliable storage system, such as HDFS or Amazon S3.
Checkpointing serves two purposes:
Instead of simply calling new StreamingContext, we need to use the StreamingContext.getOrCreate() function.
Spark Streaming applications have a few specialized tuning options:
The best approach is to start with a larger batch size (around 10 seconds) and work your way down to a smaller batch size. If the processing times reported in the Streaming UI remain consistent, then you can continue to decrease the batch size, but if they are increasing you may have reached the limit for your application. In a similar way, for windowed operations, the interval at which you compute a result (i.e., the slide interval) has a big impact on performance. Consider increasing this interval for expensive computations if it is a bottleneck.
We can control the GC by adding -XX:+UseConcMarkSweepGC to the spark.execu tor.extraJavaOptions
MLlib is best suited for running each algorithm on a large dataset. If you instead have many small datasets on which you want to train different learning models, it would be better to use a single node learning library (e.g., Weka or SciKit-Learn) on each node, perhaps calling it in parallel across nodes using a Spark map().
Sparse vectors are usually preferable (both in terms of memory use and speed) if at most 10% of elements are nonzero.